---
layout: post
date: 2024-10-30
title: "TCP Server in Zig - Part 8 - Epoll & Kqueue"
description: "Abstracitng epoll & kqueue, and using them with multi-threading"
tags: [zig]
---

<p>Now that we're more familiar with <code>epoll</code> and <code>kqueue</code> individually, it's time to bring everything together. We'll begin by looking at the possible interaction between evented I/O and threads and then look at writing a basic cross-platform abstraction over the platform-specific epoll and kqueue.</p>

<h3 id=evented_multithreading><a href="#evented_multithreading" aria-hidden=true>Evented I/O + Multithreading</a></h3>
<p>We began our journey with a limited single-threaded loop, and then replaced it with a multithreaded version. However, when we looked at using evented I/O, first via <code>poll</code> and then <code>epoll</code> and <code>kqueue</code>, all of our examples reverted to being single-threaded. This might make you believe that evented I/O and multithreading are mutually exclusive ideas, but they aren't. The main reason we've only discussed evented I/O in the context of a single thread is that we had enough new things to learn without the added complexity of multithreading. Another reason is that there are different ways to weave evented I/O and multithreading together, and which you pick will largely depend on your specific requirement.</p>

<p>In our last complete implementation, we iterated through sockets which were ready, read and echo'd back messages. Basically, we did:</p>

{% highlight zig %}
if (filter == system.EVFILT.READ) {
    // this socket is ready to be read
    while (true) {
        const msg = client.readMessage() catch {
            self.closeClient(client);
            break;
        } orelse break;   // no more messages

        client.writeMessage(msg) catch {
            self.closeClient(client);
            break;
        };
    }
}
{% endhighlight %}

<p>In a real application, we'll probably want to "process" the incoming message. That processing might involve external I/O or heavy computation. If we do that in the same thread which is waiting for readiness and reading and writing messages, we're going to have similar scaling challenges as our simple single-threaded server. What we might want to do instead is to launch a thread (probably via a thread-pool), to process the message:</p>

{% highlight zig %}
if (filter == system.EVFILT.READ) {
    while (true) {
        const msg = client.readMessage() catch {
            self.closeClient(client);
            break;
        } orelse break;

        // process the client message in a separate thread
        try thread_pool.spawn(Client.process, .{client, msg});
    }
}
{% endhighlight %}


<p>As a slight alternative, we could even move the call to <code>client.readMessage</code> to the thread:</p>

{% highlight zig %}
if (filter == system.EVFILT.READ) {
    try thread_pool.spawn(Client.process, .{client, msg});
}
{% endhighlight %}

<p>Adding a thread pool seems easy but we've introduced bugs. While it's more obvious in the first example, both approaches can result in processing concurrent messages for the same client. Neither our <code>Client</code> nor <code>Reader</code> are thread safe. There are a number of possible solutions. We could make <code>Client</code> thread-safe. Making <code>Reader</code> thread-safe is going to be much more difficult though - we need a distinct buffer and position information per message. Even in the first example, where it seems like <code>Reader</code> might not need to be thread-safe, we have bugs. Remember that <code>msg</code> references <code>client.reader.buf</code>. If we want <code>msg</code> to remain valid through the thread's processing of the message, we'll have to clone it - a pretty big compromise.</p>

<aside><p>Zig's built-in <code>std.Thread.Pool</code> isn't the most efficient because it allocates memory to <a href=/Closures-in-Zig/>create a closure</a>. If you're planning on spawning hundreds of jobs per second, you <a href=/Writing-a-Task-Scheduler-in-Zig/>can come up with a more memory-efficient design</a>.</p></aside>

<p>We could enhance our thread-pool so that a given client is always passed to the same thread. This serializes the processing of all messages for a client - it's similar to how an actor works in Erlang / Elixir. But that has its own consequences - system resources might not be utilized, and a "heavy" client can adversely impact other clients which are assigned to the same thread.</p>

<p>Another option is to use <code>EPOLL.ONESHOT</code> and <code>EV.DISPATCH</code> to limit the number concurrent messages we have per client to one. The downside with this approach is the extra system call needed to re-arm the notification once we're ready to process the next message.</p>

<p>Each solution has trade-offs. If you expect few clients (a couple hundred) and processing is lightweight, you might not even need multithreading. If messages are infrequent, then the cost of re-arming a disabled notification might be insignificant. If you're able to customize a thread pool for your needs, maybe assigning each client a "weight" or "group" to better distribute load, then you might be able to use your thread pool to serialize message processing.</p>

<p>This series won't explore any of these solutions in greater detail - they're too application-specific and aren't directly related to network programming. But, both <code>epoll</code> and <code>kqueue</code> have two additional features we should talk about.</p>

<h3 id=thread_safety><a href="#thread_safety" aria-hidden=true>Epoll & Kqueue Thread-Safety</a></h3>
<p>Both <code>epoll</code> and <code>kevent</code> are thread-safe. Specifically, if you're blocked on a call to <code>epoll_wait</code> or <code>kevent</code> you can make changes to the notification list from another thread using <code>epoll_ctl</code> or <code>kevent</code>. So if we did use <code>EPOLL.ONESHOT</code> or <code>EV.DISPATCH</code> and processed the message in a separate thread, we could re-arm the notification within that thread. Re-arming will always require that extra system call, but being able to do it from the thread provides an easy way to ensure we only process one message per client at a time.</p>

<p>There isn't much more to say about this, but it's such an important behavior that I wanted to highlight it in its own section. I will point out that <code>epoll</code>'s documentation is unambiguous about this, but I couldn't find anything definitive for <code>kqueue</code>. My own testing shows that this works as expected and online comments mention that <code>kqueue</code> is thread-safe, but it the man pages doesn't mention anything.</p>

<h3 id=unblocking-pipe><a href="#unblocking-pipe" aria-hidden=true>Unblocking With Pipe</a></h3>
<p>You might find yourself waiting to unblock the call to <code>epoll_wait</code> or <code>kevent</code>. A simple use-case would be to initiate a shutdown. A lazy solution is to rely on a short timeout and then check a condition, something like this:</p>

{% highlight zig %}
// never wait for than 1 second.
const ready_events = try self.loop.wait(1000);
if (@atomicLoad(bool, &self.shutdown, .acquire)) {
    return;
}
{% endhighlight %}

<p>But, I always find this an awkward compromise between having a wastefully short timeout and a unusable long one.</p>

<p>One option which works on both Linux and BSD/MacOS is to create a pipe. From our point of view, this behaves like a socket - we monitor one end of the pipe and can write to the other:</p>

{% highlight zig %}
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;

pub fn main() !void {
    // epoll_create1 takes flags. We aren't using any in these examples
    const efd = try posix.epoll_create1(0);
    defer posix.close(efd);

    const pipe = try posix.pipe();
    defer posix.close(pipe[0]);

    {
        // monitor one end of the pipe
        var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.fd = pipe[0]}};
        try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, pipe[0], &event);
    }

    const thread = try std.Thread.spawn(.{}, shutdown, .{pipe[1]});
    thread.detach();

    var ready_list: [16]linux.epoll_event = undefined;
    while (true) {
        const ready_count = posix.epoll_wait(efd, &ready_list, -1);
        for (ready_list[0..ready_count]) |ready| {
            const ready_socket = ready.data.fd;
            if (ready_socket == pipe[0]) {
                std.debug.print("shutting down", .{});
                return;
            }
        }
    }
}

fn shutdown(signal: posix.socket_t) void {
    std.time.sleep(std.time.ns_per_s * 5);
    posix.close(signal);
}
{% endhighlight %}

<p>I've removed all of the other socket code to keep the example lean. The point is to show how to create a pipe, and how we can monitor one end, while manipulating the other from a separate thread. Our thread closes the pipe, but we could just have easily written to it. However, as far as I know, it is technically possible to get a partial write to a pipe (just like a socket). So if you're writing more than one byte to the pipe from multiple threads, you'll need to synchronize the writes (although, a partial write to a pipe might be so rare that you don't have to worry about it...I don't know).</p>

<h3 id=eventfd><a href="#eventfd" aria-hidden=true>Unblocking With Eventfd</a></h3>
<p>While <code>pipe</code> is fine for a one-time only signal like shutdown, and can be be made to work for more complex cases, both <code>epoll</code> and <code>kqueue</code> have specific mechanisms that we can leverage. For <code>epoll</code> we can use an <code>eventfd</code>:</p>

{% highlight zig %}
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;

pub fn main() !void {
    const efd = try posix.epoll_create1(0);
    defer posix.close(efd);

    const signal = try posix.eventfd(0, 0);
    errdefer posix.close(signal);

    {
        // monitor the signal
        var event = linux.epoll_event{.events = linux.EPOLL.IN | linux.EPOLL.ET, .data = .{.fd = signal}};
        try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, signal, &event);
    }

    const thread = try std.Thread.spawn(.{}, notify, .{signal});
    thread.detach();

    var ready_list: [16]linux.epoll_event = undefined;
    while (true) {
        const ready_count = posix.epoll_wait(efd, &ready_list, -1);
        for (ready_list[0..ready_count]) |ready| {
            const ready_socket = ready.data.fd;
            if (ready_socket == signal) {
                std.debug.print("got notification\n", .{});
            }
        }
    }
}

fn notify(signal: posix.fd_t) void {
    for (0..5) |_| {
        std.time.sleep(std.time.ns_per_s);
        const value: usize = 1;
        _ = posix.write(signal, std.mem.asBytes(&value)) catch break;
    }
}
{% endhighlight %}

<p>By default, an <code>eventfd</code> is actually an 8-byte integer value. The value we <code>write</code> to it, <code>1</code> above, increments the counter. We can <code>posix.read</code> the eventfd to get the value and reset the counter. The reason we don't <code>read</code> in the above code is because we used <code>EPOLL.ET</code>. We looked at edge-trigger before, but as a recap, if we didn't use <code>EPOLL.ET</code> and we didn't <code>read</code>, a single write would result in endless notifications. With <code>EPOLL.ET</code>, we're only notified once per <code>write</code>, which is exactly what we want. An interesting thing about reading from an <code>eventfd</code> is that the read will always read 8 bytes (provided you give it an 8-byte buffer) or error - it can't under-read like a socket.</p>

<p>In addition to working as a signal, like we did above with <code>EPOLL.ET</code>, or as a counter - requiring a <code>posix.read</code> to get the value and reset it - an <code>eventfd</code> can also act like a semaphore. We created our <code>evenfd</code> using <code>posix.eventfd(0, 0)</code>. The first parameter is the initial counter value. We set it to <code>0</code> because we're using it as a signaling mechanism, not a counter, so we don't care about the value. The second parameter is for flags. We could have specified <code>linux.EFD.SEMAPHORE</code> which would have turned our <code>eventfd</code> into a semaphore. In this mode <code>read</code> always decrements the counter by <code>1</code>, and blocks if the counter is <code>0</code>. If we OR'd the <code>linux.EFD.NONBLOCK</code> flag, then <code>read</code> would return <code>error.WouldBlock</code> rather than blocking.</p>

<p>You might never use the counter or semaphore behaviors of <code>eventfd</code>, but using it as a signal, especially when paired with <code>EPOLL.ET</code> is a simple way to unblock an <code>epoll_wait</code> from a separate thread.</p>

<h3 id=evfilt_user><a href="#evfilt_user" aria-hidden=true>Unblocking With EVFILT.USER</a></h3>
<p><code>kqueue</code> has its own special mechanism for signaling: user filters. Unlike <code>epoll</code>, we don't have to set anything up:</p>

{% highlight zig %}
const std = @import("std");
const net = std.net;
const posix = std.posix;

pub fn main() !void {
    const kfd = try posix.kqueue();
    defer posix.close(kfd);

    const thread = try std.Thread.spawn(.{}, notify, .{kfd});
    thread.detach();

    var ready_list: [15]posix.Kevent = undefined;
    while (true) {
        const ready_count = try posix.kevent(kfd, &.{}, &ready_list, null);
        for (ready_list[0..ready_count]) |ready| {
            const ready_data = ready.udata;
            if (ready_data == 0) {
                std.debug.print("got notification\n", .{});
            }
        }
    }
}

fn notify(kfd: posix.fd_t) void {
    for (0..5) |_| {
        std.time.sleep(std.time.ns_per_s);
        _ = posix.kevent(kfd, &.{.{
            .ident = 1,
            .filter = posix.system.EVFILT.USER,
            .flags = posix.system.EV.ADD | posix.system.EV.CLEAR,
            .fflags = posix.system.NOTE.TRIGGER,
            .data = 0,
            .udata = 0,
        }}, &.{}, null) catch @panic("TODO");
    }
}
{% endhighlight %}

<p>Instead of using a <code>filter</code> of <code>EVFILT.READ</code> or <code>EVFILT.WRITE</code>, as in previous example, we're using <code>EVFILT.USER</code>. The other important part is setting the <code>EV.CLEAR</code> flag to ensure that the notification only triggers once. Setting the <code>fflags</code> to <code>NOTE.TRIGGER</code> is what actually triggers  the wakeup.</p>

<p>You might be wondering about the <code>ident</code> field set to <code>1</code>. In our <a href=/TCP-Server-In-Zig-Part-7-Kqueue/>Kqueue part</a> we mentioned that a monitor is identified by its <code>ident</code> and <code>filter</code> fields. Therefore, while we might add another monitor to something with an <code>ident</code> of <code>1</code> (like a file descriptor), that would have a different <code>filter</code>, likely <code>EV.READ</code> or <code>EV.WRITE</code>.</p>

<p>An <code>EVFILT.USER</code> supports special values for the <code>fflags</code> field. We're using <code>NOTE.TRIGGER</code> which is what activates the notification. But we can also store arbitrary data in the lower 24-bits of <code>fflag</code> and can manipulate the value by setting special values, like <code>NOTE.FFAND</code>, to bitwise-AND the existing <code>fflags</code> value with the one we specify. I'm have no idea how / where this feature is being used, but it's there.</p>

<h3 id=multithreading_accept><a href="#multithreading_accept" aria-hidden=true>Multithreading Accept</a></h3>
<p><code>eventfd</code> and <code>EVFLT.USER</code> are useful thread-safe mechanisms for signalling our accept loop. If you do add a thread pool to process messages, you might find yourself needing to unblock the main thread's <code>epoll_wait</code> or <code>kevent</code> from within the spawned threads. And the reason you want a thread pool is to keep your accept loop lean and free to react to socket readiness.</p>

<p>However, at very large scale, where you might see tens of thousands of clients trying to connect per second, even the leanest accept loop can become a bottleneck. One option is to have multiple threads accepting on the same socket. This mostly works as you'd hope, but there are a few changes we'll want to make. First, the basic code, which works but isn't as efficient as we can make:</p>

{% highlight zig %}
const std = @import("std");
const net = std.net;
const posix = std.posix;

pub fn main() !void {
    const address = try std.net.Address.parseIp("127.0.0.1", 5882);

    const tpe: u32 = posix.SOCK.STREAM;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    var listeners: [2]std.Thread = undefined;
    for (&listeners, 0..) |*l, id| {
        l.* = try std.Thread.spawn(.{}, accept, .{listener, id});
    }

    for (listeners) |l| {
        l.join();
    }
}
fn accept(listener: posix.socket_t, id: usize) !void {
    while (true) {
        var client_address: net.Address = undefined;
        var client_address_len: posix.socklen_t = @sizeOf(net.Address);

        const socket = posix.accept(listener, &client_address.any, &client_address_len, 0) catch |err| {
            // Rare that this happens, but in later parts we'll
            // see examples where it does.
            std.debug.print("error accept: {}\n", .{err});
            continue;
        };
        defer posix.close(socket);

        std.debug.print("[{d}] {} connected\n", .{id, client_address});

        write(socket, "Hello (and goodbye)") catch |err| {
            // This can easily happen, say if the client disconnects.
            std.debug.print("error writing: {}\n", .{err});
        };
    }
}

fn write(socket: posix.socket_t, msg: []const u8) !void {
    var pos: usize = 0;
    while (pos < msg.len) {
        const written = try posix.write(socket, msg[pos..]);
        if (written == 0) {
            return error.Closed;
        }
        pos += written;
    }
}
{% endhighlight %}

<p>I won't hold it against you for not noticing, but this is almost identical to our very first implementation from part 1. The only difference is that we've extracted the accept loop into its own function and launched multiple threads to accept connections.</p>

<p>One issue with the above is that the O/S might not distribute new connection very well. At a small scale, this is something you might not notice. For example, when I run the above and try to connect multiple clients, I get:</p>

{% highlight text %}
[0] 127.0.0.1:49284 connected
[1] 127.0.0.1:49285 connected
[0] 127.0.0.1:49286 connected
[1] 127.0.0.1:49287 connected
[0] 127.0.0.1:49288 connected
[1] 127.0.0.1:49289 connected
[0] 127.0.0.1:49290 connected
{% endhighlight %}

<p>So while it might not seem that we have a balancing problem, under load, we might. This is easy to fix: set the <code>SO.REUSEPORT</code> socket option:</p>

{% highlight zig %}
// We keep the existing SO.REUSEADDR
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));

// We add SO.REUSEPORT
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEPORT, &std.mem.toBytes(@as(c_int, 1)));
{% endhighlight %}

<p>There's a bit more to this story. <code>SO.REUSEPORT</code> is originally a BSD option which allows multiple processes to bind to the same address. When the option was added to Linux, load balancing was included. In the name of backwards compatibility (I assume), BSD didn't add load balancing as a feature to their <code>SO.REUSEPORT</code>, but rather added a different option: <code>SO.REUSEPORT_LB</code>. The simple solution is to see what's defined and use that:</p>

{% highlight zig %}
// We keep the existing SO.REUSEADDR
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));

// Add...
if (@hasDecl(posix.SO, "REUSEPORT_LB")) {
    // if available, use SO.REUSEPORT_LB
    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEPORT_LB, &std.mem.toBytes(@as(c_int, 1)));
} else if (@hasDecl(posix.SO, "REUSEPORT")) {
    // else, if available use REUSEPORT
    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEPORT, &std.mem.toBytes(@as(c_int, 1)));
}
{% endhighlight %}

<p>Having multiple threads accepting connections doesn't mean that we can't also have a thread pool to process client messages. They're solving different problems. Here we're trying to make sure our accept-loop itself isn't a bottleneck, whereas our above thread pools are trying to make sure our event loop doesn't block while processing a client message.</p>

<h3 id=wakeups><a href="#wakeups" aria-hidden=true>Unnecessary Wakeups</a></h3>
<p>As we discovered above, one of the neat features of both <code>epoll</code> and <code>kqueue</code> is that they're thread-safe. We can modify an instance from different threads, and those changes will be picked up even by a blocked <code>epoll_wait</code> or <code>kevent</code>. It's great.</p>

<p>But consider our multi-threaded accept loop. That simple example was using a blocking listening socket - would it still work with <code>epoll</code> and <code>kqueue</code> using non-blocking sockets? The short answer is yes. The longer answer is that when multiple threads register their interest in a socket, every thread will be notified of readiness. For example, imagine our listening socket is in non-blocking mode, and our <code>accept</code> function sets up an epoll instance:</p>

{% highlight zig %}
fn accept(listener: posix.socket_t, id: usize) !void {
    const efd = try posix.epoll_create1(0);
    defer posix.close(efd);

    {
        // monitor our listening socket
        var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.fd = listener}};
        try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, listener, &event);
    }

    // rest of our normal accept loop
}
{% endhighlight %}

<p>We now have multiple threads monitoring the same socket. This is perfectly legal, but it isn't efficient. When a socket becomes ready, in this case when one or more client connects, all of the threads will be notified. To deal with this, we have two options. The first is to do nothing, our code already swallows <code>error.WouldBlock</code> on <code>accept</code>. We ignore this error because we accept in a loop, accepting until there are no more new connection. Now this error is going to be more frequent since multiple threads will be competing to accept connections. Like I said, it isn't efficient, but it works.</p>

<p>On new-ish versions of linux, we can set <code>events</code> to <code>linux.EPOLL.IN | linux.EPOLL.EXCLUSIVE</code>. This will limit the number of threads that will be notified. The documentation says that "one or more" threads will be notified. I interpret this to mean: mostly one thread will be notified, but we make no guarantees, so program defensively. And that's fine since we're already ignoring <code>error.WouldBlock</code>.</p>

<p>For BSD, I'm not aware of a solution that's as simple and effective as <code>EPOLL.EXCLUSIVE</code>. As far as I know, the only way to avoid this is to use <code>EV.ONESHOT</code> (discussed briefly in <a href=/TCP-Server-In-Zig-Part-7-Kqueue/#edge_triggered>part 7</a>) and then re-arm the notification. Here's the relevant change to <code>KQueue.accept</code>:</a>

{% highlight zig %}
fn accept(self: *Server, listener: posix.socket_t) !void {
    const space = self.max - self.connected;
    for (0..space) |_| {
        var address: net.Address = undefined;
        var address_len: posix.socklen_t = @sizeOf(net.Address);
        const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
            error.WouldBlock => {
                // This was added. We have to re-arm the listener.
                try self.loop.addListener(listener);
                return;
            }
            else => return err,
        };

        // ... same as before

    } else {
        // we no longer need to remove the listener if we're full
        // since the listener was added with ONESHOT and willy only notify
        // if it's re-added, which we do above
        // Remove this entire else block.
    }
}
{% endhighlight %}

<p>Whatever thread gets woke up needs to re-arm the listener after it's done accepting pending connections. As a small consolation prize for this extra system call, we no longer have to remove our listener when the server is full (because we're using <code>EV.ONESHOT</code>).</p>

<h3 id=platform_abstraction><a href="#platform_abstraction" aria-hidden=true>Platform Abstraction</a></h3>
<p>One of the things I like most about <code>epoll</code> and <code>kqueue</code> is that they're similar, so building an abstraction for their differences isn't too difficult. We already did most of the work in the previous parts when we extracted most of the logic to an <code>Epoll</code> and <code>KQueue</code> struct. One thing that we're still missing is an abstraction over our ready events. For example, in our <code>epoll</code> code, our loop looks like:</p>

{% highlight zig %}
const ready_events = self.loop.wait(next_timeout);
for (ready_events) |ready| {
    switch (ready.data.ptr) {
        0 => self.accept(listener) catch |err| log.err("failed to accept: {}", .{err}),
        else => |nptr| {
            const events = ready.events;
            const client: *Client = @ptrFromInt(nptr);

            if (events & linux.EPOLL.IN == linux.EPOLL.IN) {
             //...
            }

            //...
        },
    }
}
{% endhighlight %}

<p>Details like <code>data.ptr</code>, <code>ready.events</code> and <code>EPOLL.IN</code> need to be moved into our <code>Epoll</code> struct.</p>

<p>Before we solve that, you might be thinking that <a href="/Zig-Interfaces/">an interface</a> would be a good option. Alternatively, if you're more familiar with Zig, you might be thinking that a tagged union would make more sense, given that all implementation are known ahead of time.</p>

<p>In my opinion, a better option is to use comptime and set the implementation based on our build target:</p>

{% highlight zig %}
const Loop = switch (@import("builtin").os.tag) {
    .macos, .ios, .tvos, .watchos, .freebsd, .netbsd, .dragonfly, .openbsd => KQueue,
    .linux => EPoll,
    else => @panic("platform not supported, (yet??)"),
};
{% endhighlight %}

<p>Now we can replace any use of <code>Epoll</code> or <code>KQueue</code> with <code>Loop</code>.</p>

<p>Back to finalizing our abstraction. I think the best solution is to make <code>wait</code> return an <code>Iterator</code>. I'll provide the <code>epoll</code> implementation and leave it up to you to write it for <code>kqueue</code>:</p>

{% highlight zig %}
// This is what both the Epoll.Iterator and your KQueue.Iterator should yield
const Event = union(enum) {
    accept: void,
    read: *Client,
    write: *Client,
};

const Epoll = struct {
    efd: posix.fd_t,
    ready_list: [128]linux.epoll_event = undefined,

    // ...
    // everything is the same, except for wait
    // ...

    // we now return an Epoll.Iterator
    fn wait(self: *Epoll, timeout: i32) Iterator {
        const ready_list = &self.ready_list;
        const count = posix.epoll_wait(self.efd, ready_list, timeout);

        return .{
            .index = 0,
            .ready_list = ready_list[0..count],
        };
    }

    const Iterator = struct {
        index: usize,
        ready_list: []linux.epoll_event,

        fn next(self: *Iterator) ?Event {
            const index = self.index;
            const ready_list = self.ready_list;
            if (index == ready_list.len) {
                return null;
            }

            self.index = index + 1;
            const ready = ready_list[index];
            switch (ready.data.ptr) {
                0 => return .{ .accept = {} },
                else => |nptr| {
                    const client: *Client = @ptrFromInt(nptr);
                    if (ready.events & linux.EPOLL.IN == linux.EPOLL.IN) {
                        return .{.read = client};
                    }
                    return .{.write = client};
                }
            }
        }
    };
};
{% endhighlight %}

<p>We use a tagged union, named <code>Event</code>, to represent the type of readiness our system handles. This can be expanded to support other type, like <code>shutdown</code>. Using this tagged union not only provides a platform-agnostic target for our platform-specific iterators, but it also cleans up our accept loop:</p>

{% highlight zig %}
while (true) {
    const next_timeout = self.enforceTimeout();
    var it = self.loop.wait(next_timeout);
    while (it.next()) |ready| {
        switch (ready) {
            .accept => self.accept(listener) catch |err| log.err("failed to accept: {}", .{err}),
            .read => |client| {
                while (true) {
                    // our readMessage Loop...
                }
            },
            .write => |client| client.write() catch self.closeClient(client),
        }
    }
}
{% endhighlight %}

<p>We'll need other change to make the two structures compatible. For example, <code>Epoll.wait</code> can't fail, while <code>KQueue.wait</code> can; the above return type needs to change to <code>!Iterator</code>.</p>

<h3 id=conclusion><a href="#conclusion" aria-hidden=true>Conclusion</a></h3>
<p>There are various patterns and techniques that can be weaved under, above or into the fabric of a TCP server. Hopefully this series has given you a reasonable overview as well as usable code. It probably goes without saying that a lot more could be written about these topic. I do want to highlight that both <code>epoll</code> and <code>kqueue</code> support additional features, like timers. If you're using either, read their documentation thoroughly.</p>

<p>If you're only expecting a handful of clients, consider the thread-per-connection approach. It isn't flashy, but it's simpler to implement and has fewer edge-cases.</p>

<p>Whatever number of clients you're expecting, don't trust them. Obviously a service opened to the public internet needs to be mindful of malicious actors, but even in a closed system, bugs happen. As a basic example, if you're using length-prefixed message and are allocating dynamic memory for large messages, check that the length against a <code>MAX_LENGTH</code> value first. Finally, micro-optimize your code. It's how you'll learn and, if you're dealing with a lot of clients, small changes add up.</p>

<div class=pager>
  <a class=prev href=/TCP-Server-In-Zig-Part-7-Kqueue/>Kqueue</a>
  <span class=next></span>
</div>

